package cn.org.wangchangjiu.redis.delay;

import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Classname DelayQueueMessageConsumer
 * @Description 延迟队列消费者
 * @Date 2022/9/14 10:28
 * @Created by wangchangjiu
 */
@Slf4j
public class DelayQueueMessageConsumer implements BeanPostProcessor, ApplicationRunner {

    private final Map<String, RedisDelayMessageConsumerContainer> consumerContainerGroups = new HashMap<>();

    private DelayQueue delayQueue;


    private String registerService;

    private RedisDelayProperties redisDelayProperties;

    private Set<String> topics = new HashSet<>();

    private Map<String, Class> topicParamTypeMap = new HashMap<>();

    /**
     *  boss 线程每隔 500ms 提交检查任务
     */
    ExecutorService bossExecutorService = Executors.newSingleThreadExecutor();

    /**
     *  工作线程 处理任务获取延迟消息
     */
    ExecutorService workerExecutorService = Executors.newCachedThreadPool();

    /**
     *  后台线程 ack 检测
     */
    ExecutorService backExecutorService = Executors.newSingleThreadExecutor();

    public DelayQueueMessageConsumer(DelayQueue delayQueue,
                                     RedisDelayProperties redisDelayProperties){
        this.delayQueue = delayQueue;
        this.redisDelayProperties = redisDelayProperties;
        this.registerService = redisDelayProperties.getRegisterService();
    }


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        Map<Method, RedisDelayMessageListener> temp = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<RedisDelayMessageListener>) method -> AnnotationUtils.findAnnotation(method, RedisDelayMessageListener.class));
        if (!CollectionUtils.isEmpty(temp)) {
            for (Map.Entry<Method, RedisDelayMessageListener> entry : temp.entrySet()) {
                RedisDelayMessageConsumerContainer consumerContainer = new RedisDelayMessageConsumerContainer(bean, entry.getKey(), entry.getValue());
                RedisDelayMessageListener redisDelayMessageListener = entry.getValue();
                consumerContainerGroups.merge(redisDelayMessageListener.topic(),
                        consumerContainer,
                        (redisDelayMessageConsumerContainer, redisDelayMessageConsumerContainer2) -> redisDelayMessageConsumerContainer2);
                topics.add(redisDelayMessageListener.topic());

                Class<?>[] parameterTypes = entry.getKey().getParameterTypes();
                if(parameterTypes == null || parameterTypes.length == 0){
                    continue;
                }

                // 第一个参数的类型
                topicParamTypeMap.put(redisDelayMessageListener.topic(), parameterTypes[0]);
            }
        }
        return bean;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        bossExecutorService.submit(() -> {
            oneTimeBatchGetMessages();
            doCycleGetMessage();
        });

        backExecutorService.submit(() -> {

            while (true) {
                delayQueue.checkAck(this.registerService);
                try{
                    Thread.sleep(redisDelayProperties.getBackThreadSleep());
                } catch (Exception e){
                    log.error(e.getMessage(), e);
                }
            }

        });
        log.info("(Redis延迟队列启动成功)");
    }

    private void oneTimeBatchGetMessages() {

        this.topics.parallelStream().forEach(topic -> {
            workerExecutorService.execute(new BatchDelayMessageTaskExecutor(this.delayQueue, this.registerService, topic, new ListenableFutureCallback<List<RedisDelayMessage>>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("DelayMessageTaskExecutor get message error:{}", ex);
                }

                @Override
                public void onSuccess(List<RedisDelayMessage> result) {
                    if(CollectionUtils.isEmpty(result)){
                        return;
                    }
                    result.stream().forEach(redisDelayMessage -> {
                        doInvokeMessage(redisDelayMessage);
                    });

                }
            }));
        });

    }

    /**
     *  反射调用延迟消息
     * @param redisDelayMessage
     */
    private void doInvokeMessage(RedisDelayMessage redisDelayMessage) {
        RedisDelayMessageConsumerContainer consumerContainer = consumerContainerGroups.get(redisDelayMessage.getTopic());
        if(consumerContainer == null){
            log.error("topic :{} consumer not found", redisDelayMessage.getTopic());
        } else {
            consumerContainer.invoke(redisDelayMessage, topicParamTypeMap.get(redisDelayMessage.getTopic()), delayQueue);
            if(consumerContainer.getAutoAck()){
                delayQueue.ackMessage(redisDelayMessage.getRegisterService(), redisDelayMessage.getTopic(), redisDelayMessage.getMessageId());
            }
            log.info("--延迟队列中间件...topic:{}, 获取延迟消息：{}, 发送执行业务--", redisDelayMessage.getTopic(), redisDelayMessage);
        }
    }

    /**
     *  周期获取延迟消息
     */
    private void doCycleGetMessage() {
        while (true) {
            this.topics.parallelStream().forEach(topic -> {
                workerExecutorService.execute(new DelayMessageTaskExecutor(this.delayQueue, this.registerService, topic, new ListenableFutureCallback<RedisDelayMessage>() {
                    @Override
                    public void onFailure(Throwable ex) {
                        log.error("DelayMessageTaskExecutor get message error:{}", ex);
                    }

                    @Override
                    public void onSuccess(RedisDelayMessage result) {
                        if(result == null){
                            return;
                        }
                        doInvokeMessage(result);
                    }
                }));
            });

            try{
                Thread.sleep(redisDelayProperties.getBoosThreadSleep());
            } catch (Exception e){
                log.error(e.getMessage(), e);
            }
        }
    }
}
